package com.ideal.service.hive;

import com.ideal.service.hdfs.HDFSService;
import com.ideal.tools.db.MysqlDBUtils;
import com.ideal.tools.ssh.common.CommonTools;
import com.ideal.tools.ssh.common.PropertyBox;
import com.ideal.tools.ssh.context.ClusterContext;
import com.ideal.tools.ssh.entity.ContextResult;
import com.ideal.tools.ssh.result.LinuxResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.*;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

/**
 * Created by CC on 2016/7/25.
 */
public class HiveService {
    //记录hdfsPath前缀
    private String hdfsPathPrefix ;
    private static Logger logger = LoggerFactory.getLogger(HiveService.class);

    /**
     * 1.加载hive最新数据
     * 2.获取数据库hive数据
     * 3.处理比对结果
     */
    public void refreshHive(ClusterContext context) {
        if(null == context){
            context = new ClusterContext(new HashMap<String, Object>());
        }
        new HDFSService().refreshHDFS(context);
        hdfsPathPrefix = PropertyBox.getVal("HIVE_HDFSPATH_PREFIX","");
        List<Map<String, Object>> newHiveList = loadNewHive();
        List<Map<String, Object>> dbHiveList = selectDbHive();
        //获取数据库clusterUser信息
        Map<String,String> clusterUserMap = selectClusterUserMap();
        //获取数据库hdfsInfoBak信息
        Map<String,String> hdfsInfoBakMap = selectHdfsInfoBakMap();
        handleCompareResult(newHiveList,dbHiveList,clusterUserMap,hdfsInfoBakMap,context);
    }

    /**
     *
     * @param username
     */
    public List<String> getHiveTableByUserName(String username){
        List<String> rs= new ArrayList<String>();
        String sql = "select tbl_name from tbls left join dbs on tbls.db_id = dbs.db_id where dbs.name='"+username+"'";
        List<Map<String,Object>> rows=MysqlDBUtils.queryHiveDB(sql);
        if (rows==null)return rs;
        for (Map<String,Object> row:rows){
            rs.add(row.get("tbl_name").toString());
        }
        return rs;
    }

    /**
     * 获取数据库clusterUser信息
     * @return
     */
    private Map<String,String> selectClusterUserMap(){
        Map<String,String> clusterUserMap = new HashMap<String, String>();
        String clusterSql = "select id,userName from cluster_user";
        List<Map<String , Object>> list = MysqlDBUtils.queryWebDB(clusterSql);
        if(null!= list && list.size()>0){
            for (Map<String, Object> user : list) {
                clusterUserMap.put(user.get("userName").toString(), user.get("id").toString());
            }
        }
        return clusterUserMap;
    }

    /**
     * 获取数据库hdfsInfoBak信息
     * @return
     */
    private Map<String,String> selectHdfsInfoBakMap(){
        Map<String,String> hdfsInfoBakMap = new HashMap<String, String>();
        String hdfsPathSql = "select hdfsPath,id from meta_hdfs_info_bak";
        List<Map<String , Object>> list = MysqlDBUtils.queryWebDB(hdfsPathSql);
        if(null!= list && list.size()>0){
            for (Map<String, Object> hdfsPath : list) {
                hdfsInfoBakMap.put(hdfsPath.get("hdfsPath").toString(), hdfsPath.get("id").toString());
            }
        }
        return hdfsInfoBakMap;
    }

    /**
     * 1.加载hive最新数据
     * @return
     */
    private List<Map<String, Object>> loadNewHive(){
        StringBuffer hql = new StringBuffer();
        hql.append("select dbs.name,tbls.tbl_name,sds.location,tbls.create_time,dbs.db_id,tbls.tbl_id,tbls.sd_id,tbls.tbl_type,");
        hql.append("sds.serde_id,sds.input_format,sds.output_format,sds.cd_id from dbs,tbls,sds where dbs.db_id=tbls.db_id and tbls.sd_id=sds.sd_id");
        List<Map<String, Object>> hiveList = MysqlDBUtils.queryHiveDB(hql.toString());
        return hiveList;
    }

    /**
     * 获取数据库hive信息
     */
    private List<Map<String, Object>> selectDbHive(){
        String sqlStr = new String("SELECT id,dbName,tableName,hdfsInfoBakId,createTime,clusterUserId FROM meta_hive_info");
        List<Map<String , Object>> dbHiveList = MysqlDBUtils.queryWebDB(sqlStr);
        return dbHiveList;
    }

    /**
     * 处理比对结果
     * 1.把没有匹配到数据库hive的数据，插入到数据库中
     * 2.把匹配到数据库hive的数据，更新到数据库中
     * 3.删除数据库中hive多余的数据
     * 4.删除数据库表meta_hive_sql
     * @param newHiveList
     * @param dbHiveList
     */
    private void handleCompareResult(List<Map<String, Object>> newHiveList,List<Map<String, Object>> dbHiveList,Map<String,String> clusterUserMap,Map<String,String> hdfsInfoBakMap,ClusterContext context ){
        List<LinuxResult> resultList = new ArrayList<LinuxResult>(); //运行的结果集
        if(null == newHiveList || newHiveList.size()==0 ){
            //组装context需要乃至的结果参数
            LinuxResult tmpResult = new LinuxResult(null,null,LinuxResult.DEFAULT_FAILD_CODE);
            tmpResult.setNote("刷新失败！");
            resultList.add(tmpResult);
            context.getContextResult().setConextResult("", resultList);
            return;
        }
        //移除无效的数据结果：key：dbName+tbl_name,value:Map<String, Object>
        Map<String,Map<String, Object>> finalHiveMap = new HashMap<String, Map<String, Object>>();
        for (Map<String, Object> cols : newHiveList) {
            cols.put("dbName", cols.get("name").toString());
            if(clusterUserMap.get(cols.get("name").toString())!=null && hdfsInfoBakMap.get(cols.get("location").toString().split(hdfsPathPrefix)[1])!=null){
                cols.put("name", clusterUserMap.get(cols.get("name").toString()));
                cols.put("location", hdfsInfoBakMap.get(cols.get("location").toString().split(hdfsPathPrefix)[1]));
                finalHiveMap.put(cols.get("dbName").toString()+cols.get("tbl_name").toString(),cols);
            }
        }
        //删除用的数组
        List<String[]> deleteDbList = new ArrayList<String[]>();
        //需要更新到数据库中的
        Map<String,String[]> updatDbMap = new HashMap<String, String[]>();
        if(null != dbHiveList && dbHiveList.size()>0){
            for (Map<String, Object> curMap: dbHiveList){
                String curMapKey = curMap.get("dbName").toString()+ curMap.get("tableName").toString();
                String curMapHdfsInfoBakId = curMap.get("hdfsInfoBakId").toString();
                String curMapId = curMap.get("id").toString();
                //如果新数据中匹配到数据库数据，则移除新数据，否则就是数据库中多余的需要删除的数据
                if(finalHiveMap.get(curMapKey) != null){
                    Map<String,Object> temp = finalHiveMap.get(curMapKey);
                    String tempLocation = temp.get("location").toString();
                    //如果数据库hive中HDFSINFOBAKID 匹配不到新数据中的LOCATION，需要把数据库中数据更新
                    if(!curMapHdfsInfoBakId.equals(tempLocation)) {
                        Map<String,Object> m = new HashMap<String, Object>();
                        m.put("id",curMapId);
                        m.put("hdfsInfoBakId",tempLocation);
                        String s = tempLocation + " " +curMapId;
                        String[] tem = s.split(" ");
                        updatDbMap.put(curMapId,tem);
                    }
                    //从最新数据中移除匹配到的key,那么剩下的key就是需要插入到数据库hive中的数据
                    finalHiveMap.remove(curMapKey);
                }else{
                    deleteDbList.add(new String[]{curMapId});
                }
            }
        }

        //把没有匹配到数据库hive的数据，插入到数据库中
        if(null != finalHiveMap && finalHiveMap.size()>0){
            int count = 0;
            //新增用的数组
            Object[][] insertParam = new Object[finalHiveMap.size()][];
            for(Map.Entry<String,Map<String,Object>> entry:finalHiveMap.entrySet()){
                Map<String,Object> map = entry.getValue();
                String tempStr = map.get("dbName").toString()+" "+map.get("tbl_name").toString()+" "+map.get("location").toString()+" "+map.get("create_time").toString() +" " +map.get("name").toString();
                String[] tempArray = tempStr.split(" ");
                insertParam[count] = tempArray;
                count++;
            }
            String insertSql = "insert into meta_hive_info (dbName,tableName,hdfsInfoBakId ,createTime,clusterUserId) values (?,?,?,?,?)";
            MysqlDBUtils.batchWebDB(insertSql.toString(),insertParam);

        }
        //把匹配到数据库hive的数据，更新到数据库中
        if(null != updatDbMap && updatDbMap.size()>0){
            //新增用的数组
            Object[][] updateParam = new Object[updatDbMap.size()][];
            int count= 0;
            for(Map.Entry<String,String[]> entry:updatDbMap.entrySet()){
                updateParam[count] = entry.getValue();
                count++;
            }            //批量更新
            String updateSql = " update meta_hive_info set hdfsInfoBakId =? where id=?";
            MysqlDBUtils.batchWebDB(updateSql,updateParam);
        }

        //删除用的数组
        Object[][] deleteParam = new Object[deleteDbList.size()][];
        for(int i = 0;i<deleteDbList.size();i++){
            deleteParam[i] = deleteDbList.get(i);
        }
        //批量删除
        String delectSql = "delete from meta_hive_info where id = ?";
        MysqlDBUtils.batchWebDB(delectSql,deleteParam);

        //删除数据库表meta_hive_sql
        String deleteHiveSql  = "truncate table meta_hive_sql";
        MysqlDBUtils.updateWebDB(deleteHiveSql.toString());

        //组装context需要乃至的结果参数
        LinuxResult tmpResult = new LinuxResult(null,null,LinuxResult.DEFAULT_SUCCESS_CODE);
        tmpResult.setNote("刷新成功！");
        resultList.add(tmpResult);
        ContextResult contextResult = context.getContextResult();
        if(null == contextResult){
            contextResult = new ContextResult();
            contextResult.setConextResult("",resultList);
        }
    }

    public static void main(String[] args) {
        HiveService hiveService = new HiveService();
        hiveService.refreshHive(null);
        //hiveService.handleHiveSql(20);
    }


    /**
     * 处理hiveSql
     * 1.加载hiveSql所需要的最新参数
     * 2.拼装所有sql参数
     * 3.新增或更新到数据库表meta_hive_sql中
     * @param hiveId
     */
    public void handleHiveSql(Long hiveId){
        StringBuffer hql = new StringBuffer("SELECT dbName,tableName from meta_hive_info where id=" + hiveId);
        //根据id只能查出来一条记录
        Map<String,Object> dbHiveInfo = MysqlDBUtils.queryWebDB(hql.toString()).get(0);
        String dbName = dbHiveInfo.get("DBNAME").toString();
        String tableName = dbHiveInfo.get("TABLENAME").toString();
        //用来记录拼装的最终结果
        StringBuffer resHql = new StringBuffer();
        //拼装所有sql参数
        packageSqlParam(hql,resHql,dbName,tableName);
        //新增或更新到数据库表meta_hive_sql中
        insertOrUpdateHiveSql(hiveId, hql, resHql);
    }

    /**
     * 拼装所有sql参数
     * @param hql
     * @param resHql
     * @param dbName
     * @param tableName
     */
    public void packageSqlParam(StringBuffer hql,StringBuffer resHql,String dbName,String tableName){
        hql.setLength(0);
        hql.append(" select tbls.tbl_id,tbls.sd_id,sds.serde_id,sds.cd_id ,sds.input_format,sds.output_format,sds.location from  dbs,tbls,sds ");
        hql.append(" where dbs.db_id=tbls.db_id and tbls.sd_id =sds.sd_id and dbs.name ='" + dbName + "' and tbls.tbl_name='" + tableName + "'");
        Map<String,Object> hiveDbHiveInfo = MysqlDBUtils.queryHiveDB(hql.toString()).get(0);
        String tblId = "", serdeId = "", cd_id = "", location = "", inputFormat = "", outputFormat = "";
        tblId = hiveDbHiveInfo.get("TBL_ID").toString();
        serdeId = hiveDbHiveInfo.get("SERDE_ID").toString();
        cd_id = hiveDbHiveInfo.get("CD_ID").toString();
        location = hiveDbHiveInfo.get("LOCATION").toString();
        inputFormat = hiveDbHiveInfo.get("INPUT_FORMAT").toString();
        outputFormat = hiveDbHiveInfo.get("OUTPUT_FORMAT").toString();
        //查询table
        List<Map<String, Object>> resTab = getQueryTable(tableName, tblId, hql, resHql);
        //列cols
        getQueryCols(cd_id, hql, resHql);
        //分区
        getQueryPartition(tblId, hql, resHql);
        //加载 table 参数
        getQuerySerde(serdeId, hql, resHql);
        //input ouput location
        getQueryInOuLocation(location, inputFormat, outputFormat, resHql);
        //table params
        getQueryTblpro(resHql, resTab);
    }

    //查询table
    private List<Map<String, Object>> getQueryTable(String tableName, String tblId, StringBuffer hql, StringBuffer resultSql) {
        hql.setLength(0);
        hql.append(" select param_key,param_value from table_params where tbl_id='" + tblId + "'");
        List<Map<String, Object>> resTab = MysqlDBUtils.queryHiveDB(hql.toString());
        String tableType = "";
        for (Map<String, Object> tab : resTab) {
            if (null != tab.get("PARAM_KEY") && tab.get("PARAM_KEY").toString().contains("EXTERNAL")) {
                tableType = "EXTERNAL ";
            }
        }
        resultSql.append("CREATE " + tableType + "TABLE '" + tableName.trim() + "'");
        return resTab;
    }

    //查询列cols
    private void getQueryCols(String cd_id, StringBuffer hql, StringBuffer resultSql) {
        int count = 0;
        hql.setLength(0);
        hql.append(" select column_name,type_name from columns_v2 where cd_id='" + cd_id + "' ");
        List<Map<String, Object>> resCols = MysqlDBUtils.queryHiveDB(hql.toString());
        if (!resCols.isEmpty()) {
            StringBuffer tem = new StringBuffer();
            for (Map<String, Object> cols : resCols) {
                tem.append(resCols.size() -1 != count ? "'" + cols.get("COLUMN_NAME") + "' " + cols.get("TYPE_NAME") + "," : "'" + cols.get("COLUMN_NAME") + "' " + cols.get("TYPE_NAME"));
                count++;
            }
            resultSql.append(" (").append(tem).append(")\n");
        }
    }

    //分区
    private void getQueryPartition(String tblId, StringBuffer hql, StringBuffer resHql) {
        int count = 0;
        hql.setLength(0);
        hql.append("  select pkey_name,pkey_type from partition_keys  where tbl_id='" + tblId + "'");
        List<Map<String, Object>> resPar = MysqlDBUtils.queryHiveDB(hql.toString());
        if (!resPar.isEmpty()) {
            StringBuffer tem = new StringBuffer();
            for (Map<String, Object> pars : resPar) {
                tem.append(resPar.size()-1 != count ? "'" + pars.get("PKEY_NAME") + "' " + pars.get("PKEY_TYPE") + "," : "'" + pars.get("PKEY_NAME") + "' " + pars.get("PKEY_TYPE"));
                count++;
            }
            resHql.append("PARTITIONED BY (").append(tem).append(")\n");
        }
    }

    //加载 table 参数
    private void getQuerySerde(String serdeId, StringBuffer hql, StringBuffer resHql) {
        int count = 0;
        hql.setLength(0);
        hql.append(" select param_key,param_value from serde_params  where serde_id='" + serdeId + "'  order by serde_id");
        List<Map<String, Object>> resSer = MysqlDBUtils.queryHiveDB(hql.toString());
        if (!resSer.isEmpty()) {
            StringBuffer tem = new StringBuffer();
            for (Map<String, Object> sers : resSer) {
                tem.append(resSer.size()-1 != count ? convertDelimit(sers.get("PARAM_KEY").toString()) + " " + CommonTools.convertString(sers.get("PARAM_VALUE").toString()) + "," + "\n" : convertDelimit(sers.get("PARAM_KEY").toString()) + " " + CommonTools.convertString(sers.get("PARAM_VALUE").toString()) + "\n");
                count++;
            }
            resHql.append("ROW FORMAT DELIMITED").append("\n").append(tem);
        }
    }
    //serde_params的PARAM_KEY处理
    private String convertDelimit(String val) {
        String rs = "";
        if (val.startsWith("field")) {
            rs = "FIELDS TERMINATED BY ";
        } else if (val.startsWith("line")) {
            rs = "LINES TERMINATED BY ";
        } else if (val.startsWith("serialization")) {
            rs = "SERIALIZATION TERMINATED BY ";
        }
        return rs;
    }

    //处理input ouput location
    private void getQueryInOuLocation(String location, String inputFormat, String outputFormat, StringBuffer resHql) {
        if (!"".equals(inputFormat) || !"".equals(outputFormat) || !"".equals(location)) {
            resHql.append("STORED AS ");
        }
        if (!"".equals(inputFormat)) {
            resHql.append("INPUTFORMAT ").append(inputFormat).append("\n");
        }
        if (!"".equals(outputFormat)) {
            resHql.append("OUTPUT_FORMAT ").append(outputFormat).append("\n");
        }
        if (!"".equals(location)) {
            resHql.append("LOCATION ").append(location).append("\n");
        }
    }

    //处理table params
    private void getQueryTblpro(StringBuffer resHql, List<Map<String, Object>> resTab) {
        int count =0;
        if (!resTab.isEmpty()) {
            StringBuffer tem = new StringBuffer();
            for (Map<String, Object> tabs : resTab) {
                tem.append(resTab.size()-1 != count ? "'" + tabs.get("PARAM_KEY") + "' " + tabs.get("PARAM_VALUE") + "," : "'" + tabs.get("PARAM_KEY") + "' " + tabs.get("PARAM_VALUE"));
                count++;
            }
            resHql.append("TBLPROPERTIES (").append(tem).append(")");
        }
    }

    ///insert或update hadoopadmin_mybaties的meta_hive_sql
    private void insertOrUpdateHiveSql(Long hiveId, StringBuffer hql, StringBuffer resHql) {
        hql.setLength(0);
        hql.append(" SELECT hiveSql from meta_hive_sql  where hiveInfoId='" + hiveId + "'");
        List<Map<String, Object>> resMetaHiveSql = MysqlDBUtils.queryWebDB(hql.toString());
        hql.setLength(0);
        if (resMetaHiveSql.isEmpty()) {
            hql.append("insert into meta_hive_sql (hiveSql,hiveInfoId) values('" + resHql.toString().replace("'", "\\'") + "','" + hiveId + "')");
        } else {
            hql.append("update meta_hive_sql set hiveSql='" + resHql.toString().replace("'", "\\'") + "' where  hiveInfoId='" + hiveId + "'");
        }
        MysqlDBUtils.updateWebDB(hql.toString());
    }
}
